From ba2b1e89ac94ed0a61c5846e91f9d945625e0964 Mon Sep 17 00:00:00 2001 From: Jeroen van der Heijden Date: Fri, 30 Mar 2018 21:43:27 +0200 Subject: [PATCH] Fixed shortcuts --- grammar/grammar.py | 2 +- src/siri/db/aggregate.c | 9 +- src/siri/db/series.c | 39 ++++++--- src/siri/parser/listener.c | 170 ++++++++++++------------------------- 4 files changed, 86 insertions(+), 134 deletions(-) diff --git a/grammar/grammar.py b/grammar/grammar.py index ccfffb83..2660cd0d 100644 --- a/grammar/grammar.py +++ b/grammar/grammar.py @@ -403,7 +403,7 @@ class SiriGrammar(Grammar): uuid = Choice(r_uuid_str, string, most_greedy=False) group_match = Repeat(r_grave_str, 1, 1) series_match = List( - Choice(series_name, group_match, series_re, most_greedy=False), + Choice(Token('*'), series_name, group_match, series_re, most_greedy=False), series_sep, 1) limit_expr = Sequence(k_limit, int_expr) diff --git a/src/siri/db/aggregate.c b/src/siri/db/aggregate.c index fea468a1..62003607 100644 --- a/src/siri/db/aggregate.c +++ b/src/siri/db/aggregate.c @@ -459,13 +459,14 @@ void siridb_aggregate_list_free(slist_t * alist) */ int siridb_aggregate_can_skip(cleri_children_t * children) { - switch (children->node->children->node->cl_obj->gid) + LOGC("Gid: %d", children->node->children->node->children->node->children->node->cl_obj->gid); + switch (children->node->children->node->children->node->children->node->cl_obj->gid) { case CLERI_GID_F_COUNT: case CLERI_GID_F_FIRST: case CLERI_GID_F_LAST: return \ - children->node->children->node->children->next->next->next == NULL; + children->node->children->node->children->node->children->node->children->next->next->next == NULL; default: return 0; @@ -1562,7 +1563,7 @@ static int aggr_first( #if DEBUG assert (points->len); #endif - siridb_point_t * source = points->data[0]; + siridb_point_t * source = points->data; switch (points->tp) { @@ -1598,7 +1599,7 @@ static int aggr_last( #if DEBUG assert (points->len); #endif - siridb_point_t * source = points->data[points->len - 1]; + siridb_point_t * source = points->data + (points->len - 1); switch (points->tp) { diff --git a/src/siri/db/series.c b/src/siri/db/series.c index d2084949..5936f1be 100644 --- a/src/siri/db/series.c +++ b/src/siri/db/series.c @@ -795,15 +795,12 @@ void siridb__series_decref(siridb_series_t * series) siridb_points_t * siridb_series_get_first( siridb_series_t * series, int * required_shard) { - siridb_point_t * point; siridb_points_t * buf = series->buffer; siridb_points_t * points; - *required_shard = 0; - if (buf != NULL && buf->len && - (points->data = buf->data[0])->ts == series->start) + buf->data->ts == series->start) { points = siridb_points_new(1, series->tp); if (points == NULL) @@ -812,10 +809,15 @@ siridb_points_t * siridb_series_get_first( } /* string type does not have a buffer so we don't have to worry */ + points->data->ts = buf->data->ts; + points->data->val = buf->data->val; points->len = 1; + LOGC("First from memory"); return points; } - *required_shard = 1; + LOGC("First from shard"); + + (*required_shard)++; /* if not in the buffer, then if must be in a shard */ assert (series->idx_len); @@ -823,12 +825,16 @@ siridb_points_t * siridb_series_get_first( idx_t * first = series->idx; points = siridb_points_new(first->len, series->tp); + if (points == NULL) + { + return NULL; + } siridb_shard_get_points_callback(first->shard->flags, series)( points, first, NULL, - series->start, + &series->start + 1, series->flags & SIRIDB_SERIES_HAS_OVERLAP); assert (points->len); @@ -848,15 +854,13 @@ siridb_points_t * siridb_series_get_first( siridb_points_t * siridb_series_get_last( siridb_series_t * series, int * required_shard) { - siridb_point_t * point; siridb_points_t * buf = series->buffer; siridb_points_t * points; - - *required_shard = 0; + siridb_point_t * point; if (buf != NULL && buf->len && - (points->data = buf->data[buf->len - 1])->ts == series->end) + (point = buf->data + (buf->len - 1))->ts == series->end) { points = siridb_points_new(1, series->tp); if (points == NULL) @@ -865,10 +869,16 @@ siridb_points_t * siridb_series_get_last( } /* string type does not have a buffer so we don't have to worry */ + points->data->ts = buf->data->ts; + points->data->val = buf->data->val; points->len = 1; + LOGC("Last from memory"); return points; } - *required_shard = 1; + + LOGC("Last from shard"); + + (*required_shard)++; /* if not in the buffer, then if must be in a shard */ assert (series->idx_len); @@ -886,11 +896,15 @@ siridb_points_t * siridb_series_get_last( } points = siridb_points_new(last->len, series->tp); + if (points == NULL) + { + return NULL; + } siridb_shard_get_points_callback(last->shard->flags, series)( points, last, - last->end_ts, + &last->end_ts, NULL, series->flags & SIRIDB_SERIES_HAS_OVERLAP); @@ -917,6 +931,7 @@ siridb_points_t * siridb_series_get_count(siridb_series_t * series) points->data->val.int64 = series->length; points->len = 1; } + LOGC("Count from memory"); return points; } diff --git a/src/siri/parser/listener.c b/src/siri/parser/listener.c index 7cab55c5..3ea5f261 100644 --- a/src/siri/parser/listener.c +++ b/src/siri/parser/listener.c @@ -43,8 +43,8 @@ #include -#define MAX_ITERATE_COUNT 10000 // ten-thousand -#define SKIP_GET_POINTS -1 +#define MAX_ITERATE_COUNT 10000 // ten-thousand +#define MAX_BATCH_REQUIRE_SHARD 100 // after reading 100 shards, iterate #define QP_ADD_SUCCESS qp_add_raw( \ query->packer, (const unsigned char *) "success_msg", 11); @@ -1025,11 +1025,13 @@ static void enter_select_stmt(uv_async_t * handle) /* child is always the ',' and child->next the node */ child = query->nodes->node->children->next->node->children; + skip_get_points = siridb_aggregate_can_skip(child); + LOGC("Skip?... %d", skip_get_points); child = child->next; while (child != NULL) { - if (skip_get_points && !siridb_aggregate_can_skip(child)) + if (skip_get_points && !siridb_aggregate_can_skip(child->next)) { skip_get_points = 0; } @@ -1039,6 +1041,7 @@ static void enter_select_stmt(uv_async_t * handle) if (skip_get_points) { + LOGC("Set skip... %d", skip_get_points); q_select->flags |= QUERIES_SKIP_GET_POINTS; } @@ -3225,6 +3228,8 @@ static void exit_series_match(uv_async_t * handle) q_select->points_map = imap_new(); } + LOGC("exit series match %zu (%u)", q_select->nselects, q_select->flags); + SIRIPARSER_ASYNC_NEXT_NODE } @@ -4509,27 +4514,17 @@ static void async_no_points_aggregate(uv_async_t * handle) siridb_series_t * series; siridb_points_t * points; siridb_points_t * aggr_points; - int required_shard; - - if (q_select->n > siridb->select_points_limit) - { - snprintf(query->err_msg, - SIRIDB_MAX_SIZE_ERR_MSG, - "Query has reached the maximum number of selected points " - "(%u). Please use another time window, an aggregation " - "function or select less series to reduce the number of " - "points.", - siridb->select_points_limit); - - siridb_query_send_error(handle, CPROTO_ERR_QUERY); - return; - } - - uv_mutex_lock(&siridb->series_mutex); + int required_shard = 0; for (; q_select->slist_index < q_select->slist->len; ++q_select->slist_index) { + if (required_shard > MAX_BATCH_REQUIRE_SHARD) + { + async_more = 1; + break; + } + series = (siridb_series_t *) q_select->slist->data[q_select->slist_index]; /* @@ -4544,6 +4539,9 @@ static void async_no_points_aggregate(uv_async_t * handle) #endif siridb_aggr_t * aggr = q_select->alist->data[0]; + + uv_mutex_lock(&siridb->series_mutex); + switch (aggr->gid) { case CLERI_GID_F_COUNT: @@ -4558,8 +4556,13 @@ static void async_no_points_aggregate(uv_async_t * handle) default: assert (0); } + + uv_mutex_unlock(&siridb->series_mutex); + if (points != NULL) { + const char * name; + for (size_t i = 1; points->len && i < q_select->alist->len; i++) { aggr_points = siridb_aggregate_run( @@ -4582,111 +4585,44 @@ static void async_no_points_aggregate(uv_async_t * handle) } q_select->n += points->len; - } - } - - uv_mutex_unlock(&siridb->series_mutex); - - - - - /* We try to read the points from the cache in case a cache is created. - * If there are more select functions left we create a copy of the cache. - * When this is the last select function we pop from the cache since the - * points are no longer required. - */ - points = (q_select->points_map == NULL) ? - NULL : - q_select->nselects ? - siridb_points_copy(imap_get(q_select->points_map, series->id)): - imap_pop(q_select->points_map, series->id); - - if (points == NULL) - { - uv_mutex_lock(&siridb->series_mutex); - - points = (series->flags & SIRIDB_SERIES_IS_DROPPED) ? - NULL : siridb_series_get_points( - series, - q_select->start_ts, - q_select->end_ts); - uv_mutex_unlock(&siridb->series_mutex); - - /* when having a cache and points, add a copy of points to the cache */ - if (q_select->points_map != NULL && points != NULL) - { - siridb_points_t * cpoints = siridb_points_copy(points); - if (cpoints != NULL && - imap_add(q_select->points_map, series->id, cpoints)) - { - siridb_points_free(cpoints); - } - } - } - - if (points != NULL) - { - const char * name; - for (size_t i = 0; points->len && i < q_select->alist->len; i++) - { - aggr_points = siridb_aggregate_run( - points, - (siridb_aggr_t *) q_select->alist->data[i], - query->err_msg); - - if (aggr_points != points) + if (q_select->merge_as == NULL) { - siridb_points_free(points); - } + name = siridb_presuf_name( + q_select->presuf, + series->name, + series->name_len); - if (aggr_points == NULL) - { - siridb_query_send_error(handle, CPROTO_ERR_QUERY); - return; + if (name == NULL || ct_add(q_select->result, name, points)) + { + sprintf(query->err_msg, "Error adding points to map."); + siridb_points_free(points); + log_critical("Critical error adding points"); + siridb_query_send_error(handle, CPROTO_ERR_QUERY); + return; + } } - - points = aggr_points; - } - - q_select->n += points->len; - - if (q_select->merge_as == NULL) - { - name = siridb_presuf_name( - q_select->presuf, - series->name, - series->name_len); - - if (name == NULL || ct_add(q_select->result, name, points)) + else { - sprintf(query->err_msg, "Error adding points to map."); - siridb_points_free(points); - log_critical("Critical error adding points"); - siridb_query_send_error(handle, CPROTO_ERR_QUERY); - return; - } - } - else - { - slist_t ** plist; + slist_t ** plist; - name = siridb_presuf_name( - q_select->presuf, - q_select->merge_as, - strlen(q_select->merge_as)); + name = siridb_presuf_name( + q_select->presuf, + q_select->merge_as, + strlen(q_select->merge_as)); - plist = (slist_t **) ct_getaddr(q_select->result, name); + plist = (slist_t **) ct_getaddr(q_select->result, name); - if ( name == NULL || - plist == NULL || - slist_append_safe(plist, points)) - { - sprintf(query->err_msg, "Error adding points to map."); - siridb_points_free(points); - log_critical("Critical error adding points"); - siridb_query_send_error(handle, CPROTO_ERR_QUERY); - return; + if ( name == NULL || + plist == NULL || + slist_append_safe(plist, points)) + { + sprintf(query->err_msg, "Error adding points to map."); + siridb_points_free(points); + log_critical("Critical error adding points"); + siridb_query_send_error(handle, CPROTO_ERR_QUERY); + return; + } } } } -- 2.30.2